package day05;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

/**
 * Flink Table API 与 SQL —— 常见 api
 *
 * @author lvbingbing
 * @date 2022-01-13 15:30
 */
public class FlinkTableApi01 {
    public static void main(String[] args) throws Exception {
        // 1、获取可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        int parallelism = 1;
        env.setParallelism(parallelism);
        // 2、常见Api
        studyCommonApi(env);
        // 3、触发程序执行
        env.execute();
    }

    /**
     * @param env <br>
     */
    private static void studyCommonApi(StreamExecutionEnvironment env) {
        // 1、创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 1.1、基于老版本 planner 的流处理
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .useOldPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, environmentSettings);
        // 1.2、基于老版本 planner 的批处理
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(executionEnvironment);
        // 1.3、基于 Blink 的流处理
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);
        // 1.4、基于 Blink 的批处理
        EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);

        // 2、表的创建：连接外部系统，读取数据
        tableEnv.connect(new FileSystem().path("input/sensor.txt"))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");
        Table inputTable = tableEnv.from("inputTable");
        inputTable.printSchema();
        DataStream<Row> rowDataStream = tableEnv.toAppendStream(inputTable, Row.class);
        rowDataStream.print("inputTableRes");

        // 3、查询转换
        // 3.1、Table API
        Table resultTable = inputTable.select("id, temp")
                .filter("id === 'sensor_6'");
        DataStream<Row> rowDataStream1 = tableEnv.toAppendStream(resultTable, Row.class);
        rowDataStream1.print("result");
        // 聚合统计
        Table aggTableRes = inputTable.groupBy("id")
                .select("id, id.count as count, temp.avg as avgTemp");
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(aggTableRes, Row.class);
        tuple2DataStream.print("aggResult");

        // 3.2、Flink SQL
        String sql = "select id, temp from inputTable where id = 'sensor_6'";
        Table sqlQueryRes = tableEnv.sqlQuery(sql);
        DataStream<Row> rowDataStream2 = tableEnv.toAppendStream(sqlQueryRes, Row.class);
        rowDataStream2.print("sqlQueryResult");
        String aggSql = "select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id";
        Table aggSqlResult = tableEnv.sqlQuery(aggSql);
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream1 = tableEnv.toRetractStream(aggSqlResult, Row.class);
        tuple2DataStream1.print("aggSqlQueryResult");
    }
}
